In [8]:
from __future__ import print_function
import itertools as it
import psycopg2
import csv
from collections import defaultdict
import json
import datetime
import time
from StringIO import StringIO
import codecs
In [9]:
def cdid_details(table):
"""Processes an array-or-arrays representing a .csv file.
We need the name of the cdid so that the user can search
for it. Also capture some other data that I don't understand.
"""
# The file is broken into two sections, the data table and
# a second table of data on individual cdids. A copyright notice
# divides the two.
cdiddata = it.dropwhile(lambda row: not 'Crown' in row[0], table)
cdiddata = list(cdiddata)[1:]
data = {}
current = None
# repeating format:
# cdid | name
# | attr1=value
# | attr2=value
# etc...
for line in cdiddata:
cdid = line[0]
if cdid:
assert not data.has_key(cdid)
assert len(cdid) == 4
current = defaultdict(lambda: None)
data[cdid] = current
current['name'] = line[1]
else:
name, val = line[1].split('=')
current[name] = val
return data
def parse_csv(fname, enc = 'latin1'):
with open(fname, 'r') as fl:
fcontents = ''
while True:
# why fear monads when we have python's string handling?
data = fl.read(1024*1024)
if not data: break
converted = data.decode(enc).encode('utf-8')
fcontents += converted
io = StringIO(fcontents)
return [row for row in list(csv.reader(io)) if row]
In [10]:
def dates_of_type(n, col):
column = [x for x in col if len(x[0]) == n]
try:
offset = next(i for (i, x) in enumerate(column) if x[1])
roffset = next(i for (i, x) in enumerate(reversed(column)) if x[1])
return column[offset:len(column)-roffset]
except StopIteration:
return []
def extract_columns(table):
"""Each column is combined with the date information
and indexed by cdid; the data will be stored this
way in the database, as json, to avoid a 30 million
row SQL table.
"""
table = list(it.takewhile(lambda x: 'Crown' not in x[0], table))
dates = [row[0] for row in table][1:]
cdids = table[0][1:]
data = zip(*[row[1:] for row in table[1:]])
def by_datetype(col):
yearly, quarterly, monthly = 4, 7, 8 # '2010', '2010 Q1', '2010 JAN'
return [dates_of_type(n, col) for n in [yearly, quarterly, monthly]]
columns = {cdid:by_datetype(zip(dates, column))
for cdid, column in zip(cdids, data)}
return columns
In [11]:
def insert_cdids():
csvs = !ls onsdata2/*.csv
cdidattrs = {}
for f in csvs:
d = cdid_details(parse_csv(f))
assert d
cdidattrs.update(d)
conn = psycopg2.connect("dbname=onsstat user=holdem password=holdem host=127.0.0.1")
cur = conn.cursor()
try:
for cdid, attrs in cdidattrs.items():
price, seasonal, name = attrs['price'], attrs['seasonal_adjustment'], attrs['name']
cur.execute('INSERT INTO cdids VALUES (%s, %s, %s, %s)', (cdid, price, seasonal, name))
conn.commit()
finally:
cur.close()
conn.close()
In [12]:
def datasetinfo(fname):
""" '<directory>/<title>(<id>).csv' -> id, title """
startname = fname.find('/') + 1
endname = fname.rfind('(')
endnum = fname.rfind(')')
return fname[endname+1:endnum], fname[startname:endname]
def insert_datasets():
csvs = !ls onsdata2/*.csv
conn = psycopg2.connect("dbname=onsstat user=holdem password=holdem host=127.0.0.1")
cur = conn.cursor()
try:
for csvfile in csvs:
cur.execute('INSERT INTO datasets VALUES (%s, %s)', datasetinfo(csvfile))
conn.commit()
finally:
cur.close()
conn.close()
In [13]:
def save_columndata():
"""Saving the data on columns to a file to be read
into the database using COPY for performance.
"""
csvs = !ls onsdata2/*.csv
with open('columndata.csv', 'w') as out:
for csvfile in csvs:
table = parse_csv(csvfile)
cdid_attrs = cdid_details(table)
if not cdid_attrs:
print(csvfile)
return
# we can do this because the data is repeated ad nauseum
try:
attrs = cdid_attrs.values()[0]
except IndexError as e:
print(csvfile, table)
raise e
base, index = attrs['base_period'], attrs['index_period']
if base[0] == "'":
base = base[1:-1]
if index[0] == "'":
index = index[1:-1]
d_id, _ = datasetinfo(csvfile)
columns = extract_columns(table)
for cdid, column in columns.items():
out.write("%s\t%s\t%s\t%s\t%s\n" %
(cdid, d_id, base, index, json.dumps(column)))
out.flush()
drop table columndata;
CREATE TABLE columndata ( cdid character(4) NOT NULL, dataset_id integer NOT NULL, base_period character varying(16), index_period character varying(16), "column" text NOT NULL, CONSTRAINT datacolumn_pkey PRIMARY KEY (cdid, dataset_id), CONSTRAINT datacolumn_dataset_id_fkey FOREIGN KEY (dataset_id) REFERENCES datasets (id) MATCH SIMPLE ON UPDATE NO ACTION ON DELETE NO ACTION ) WITH ( OIDS=FALSE ); ALTER TABLE columndata OWNER TO holdem;
copy columndata from '/home/jamougha/workspace/onsstat/onsdata2/columndata.csv';
In [14]:
!rm onsdata2/columndata.csv
save_columndata()
In [7]:
In [7]:
In [7]:
In [7]:
In [7]:
In [7]:
In [7]:
In [7]:
In [ ]: